package com.xiaofan

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


/**
 * 定义输出uv统计的样例类
 */
case class UvCount(windowEnd: Long, count: Long)


object UniqueVisitor {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val inputStream: DataStream[String] = env.readTextFile("D:\\big-data\\code\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")

    val dataStream: DataStream[UserBehavior] = inputStream
      .map {
        data => {
          val arr: Array[String] = data.split(",")
          UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
        }
      }
      .assignAscendingTimestamps(_.timestamp * 1000L)


    val uvStream: DataStream[UvCount] = dataStream
      .filter(_.behavior == "pv")
      .timeWindowAll(Time.hours(1)) // 直接不分组，基于DataStream开1小时滚动窗口
      .apply(new UvCountResult)

    uvStream.print()

    env.execute("uv test!")
  }
}


/**
 * 自定义实现全窗口函数，只需要把userId放入set集合中即可
 */
class UvCountResult extends AllWindowFunction[UserBehavior, UvCount, TimeWindow] {
  override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {

    var userIds: Set[Long] = Set[Long]()

    for (userBehavior <- input) {
      userIds += userBehavior.userId
    }

    out.collect(UvCount(window.getEnd, userIds.size))

  }
}
